package cn.iocoder.springboot.lab31.rocketmqdemo.consumer;

import cn.iocoder.springboot.lab31.rocketmqdemo.producer.DemoPushProducer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class DataPushListener2 implements MessageListenerConcurrently {
    private Logger log = LoggerFactory.getLogger(getClass());
    @Autowired
    private DemoPushProducer producer;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        log.info("consumeMessage coming size:{}", msgs.size());

        for (MessageExt msgExt : msgs) {
            try {
                String msg = new String(msgExt.getBody());
                log.info("push服务:DataPushListener22->consumeMessage :{},getReconsumeTimes:{}", msg, msgExt.getReconsumeTimes());
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            } catch (Exception e) {
                log.error("DataPushListener22-consumeMessage-exception:{}", e);
                if (msgExt.getReconsumeTimes() > 2) {
                    continue;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
